package top.ply.messageservice.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
import top.ply.message.service.EmailMsgService;
import top.ply.message.service.TelephoneMsgService;
import top.ply.messageservice.mq.SendVerifyCodeQueue;
;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


@Deprecated
public class VerifyCodeConsumer {

    Logger logger = LoggerFactory.getLogger(VerifyCodeConsumer.class);

    @Autowired
    EmailMsgService emailMsgService;

    @Autowired
    TelephoneMsgService telephoneMsgService;

    private static final ExecutorService EMAIL_CODE_THREAD_POOL = Executors.newCachedThreadPool();

    private static final ExecutorService PHONE_CODE_THREAD_POOL = Executors.newCachedThreadPool();

    private static final Map<Thread, Integer> EMAIL_FAIL_TIME_MAP = new ConcurrentHashMap<>();

    private static final Map<Thread, Integer> PHONE_FAIL_TIME_MAP = new ConcurrentHashMap<>();

    public static final Integer MAX_RETRY_TIMES = 3;

    private static final ConcurrentLinkedQueue<Thread> EMAIL_POOL_THREAD_QUEUE = new ConcurrentLinkedQueue<>();

    private static final ConcurrentLinkedQueue<Thread> PHONE_POOL_THREAD_QUEUE = new ConcurrentLinkedQueue<>();

    static {
        new Thread(() -> {
           while (true) {
               if (!EMAIL_POOL_THREAD_QUEUE.isEmpty()) {
                   EMAIL_CODE_THREAD_POOL.submit(EMAIL_POOL_THREAD_QUEUE.poll());
               }
           }
        }).start();

        new Thread(() -> {
            while (true) {
                if (!PHONE_POOL_THREAD_QUEUE.isEmpty()) {
                    PHONE_CODE_THREAD_POOL.submit(PHONE_POOL_THREAD_QUEUE.poll());
                }
            }
        }).start();
    }

    @StreamListener(SendVerifyCodeQueue.EMAIL_CODE_QUEUE)
    public void sendEmailCode(Map<String, String> mailAndCode) {
        String email = mailAndCode.get("email");
        if (email == null) {
            return;
        }

        logger.info("Push send verify code to " + email + " to EMAIL_CODE_THREAD_POOL");

        Thread t = new Thread(() -> {
            emailMsgService.sendVerifyCode(email, mailAndCode.get("code"));
        });

        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                if (EMAIL_FAIL_TIME_MAP.containsKey(t)) {
                    Integer failTime = EMAIL_FAIL_TIME_MAP.get(t);
                    if (failTime >= MAX_RETRY_TIMES) {
                        e.printStackTrace();
                        logger.error(Arrays.toString(e.getStackTrace()));
                        return;
                    } else {
                        EMAIL_FAIL_TIME_MAP.put(t, failTime++);
                    }
                } else {
                    EMAIL_FAIL_TIME_MAP.put(t, 1);
                }
                EMAIL_POOL_THREAD_QUEUE.add(t);
            }
        });

        EMAIL_POOL_THREAD_QUEUE.add(t);

    }

    @StreamListener(SendVerifyCodeQueue.PHONE_CODE_QUEUE)
    public void sendTelephoneCode(Map<String, String> phoneAndCode) {
        String phone = phoneAndCode.get("phone");
        if (phone == null) {
            return;
        }

        logger.info("Push send verify code to " + phone + " to PHONE_CODE_THREAD_POOL");

        Thread t = new Thread(() -> {
            telephoneMsgService.sendVerifyCode(phone, phoneAndCode.get("code"));
        });

        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                if (PHONE_FAIL_TIME_MAP.containsKey(t)) {
                    Integer failTime = PHONE_FAIL_TIME_MAP.get(t);
                    if (failTime >= MAX_RETRY_TIMES) {
                        e.printStackTrace();
                        logger.error(Arrays.toString(e.getStackTrace()));
                        return;
                    } else {
                        PHONE_FAIL_TIME_MAP.put(t, failTime++);
                    }
                } else {
                    PHONE_FAIL_TIME_MAP.put(t, 1);
                }
                PHONE_POOL_THREAD_QUEUE.add(t);
            }
        });
    }

}
